Lambda+Athenaでモンテカルロ法をサーバーレスで実行
概要
今回はモンテカル法をAWS Lambdaを用いて同時に実行してみようと思います。
モンテカルロ法は同時実行できる数が多ければその分早く終わるので、今回はLambdaを使用して早く処理を終わらせたいとおもいます。
モンテカルロ法はWikipediaによれば以下のような感じらしいです。
モンテカルロ法(モンテカルロほう、(英: Monte Carlo method、MC)とはシミュレーションや数値計算を乱数を用いて行う手法の総称。
Lambdaは使用時間に対して課金されるため、同時実行数を高めれば、同じ料金で処理時間を短くすることができます。
この図中の四角の部分の面積は変わらないので、同時実行数が低い場合とくらべて料金は等しく、処理時間は短くなっています。
今回扱う問題
今回は円周率をモンテカルロ法を用いて求めてみたいと思います。
xを[-1, 1], yを[-1, 1]の区間でランダムな点をおきます。
ここで、辺の長さが2である正方形とそれに内接する円を考えます。
このとき、その点が円に含まれている確率は円の面積(S(circle))と正方形の面積(S(square))の比率に一致するはずです。
ここで円の半径は1、正方形の一辺は2なので、各面積は以下のように求められます。
ランダムな点が円に含まれている確率P(A)は次のようになります。
N回試行(サンプリング)を行って円に含まれている回数をAとすると、試行回数を増やしていけばA/NはP(A)に一致するはずです。 よって円周率πは次のように求められるはずです。
Pythonで書くとこんな感じです。
from random import uniform def sampling_pi(n): counter = 0 for _ in range(n): x = uniform(-1, 1) y = uniform(-1, 1) if x*x + y*y < 1: counter += 1 pi = 4.0 * counter / n return pi def main(): result = sampling_pi(100000) print(f'pi: {result}') if __name__ == "__main__": main()
アーキテクチャ
今回はLambdaからSNSにサンプリングのジョブを発行して、Lambdaでサンプリングを行い、結果をS3に保存し、Athenaで結果を確認します。
SNSでメッセージキューのように扱います。
AthenaはS3などに保存されているCSVなどのデータに対してSQL文で処理をできます。
ジョブを発行するLambdaのコードは以下のような感じです。 今回は複数回のサンプリングを1度のLambdaの実行で行います。 これはLambdaの呼び出し回数を減らしてオーバーヘッドを減らすなどの狙いがあります。
変数名 | 説明 |
---|---|
job_id |
ジョブの識別子 |
job_num |
何回Lambdaを実行するか |
index |
バッチのインデックス |
batch_size |
一度のLambdaの実行でサンプリングを回す回数 |
つまりトータルで行われるサンプリングはjob_num x batch_sizeになります。
from os import environ import boto3 import json import uuid TOPIC_ARN = environ.get('TOPIC_ARN') sns = boto3.resource('sns') topic = sns.Topic(TOPIC_ARN) def lambda_handler(event, context): job_id = uuid.uuid4().hex job_num = event['job_num'] batch_size = event['batch_size'] for i in range(job_num): topic.publish( Subject="Montecalro sampling job", Message=json.dumps({ "job_id": job_id, "index": i, "batch_size": batch_size }) )
サンプリングをするコードは次のような感じです。 Athenaは対象が小さなファイルの場合、読み込みがオーバーヘッドになってしまうため、複数回のサンプリング結果をまとめて一つのCSVとして出力しています。 今回は円の中にあった場合を1、それ以外を0としています。
from random import uniform from os import environ import csv import json from io import BytesIO, TextIOWrapper import boto3 BUCKET_NAME = environ.get('BUCKET_NAME') s3 = boto3.resource('s3') bucket = s3.Bucket(BUCKET_NAME) def sampling_pi(job_id, index, n): with BytesIO() as f: w = TextIOWrapper(f) writer = csv.DictWriter(w, ["is_inside"]) writer.writeheader() for _ in range(n): x = uniform(-1, 1) y = uniform(-1, 1) writer.writerow({ "is_inside": 1 if x*x + y*y < 1 else 0 }) w.seek(0) bucket.upload_fileobj(f, f'id={job_id}/{index}.csv') def lambda_handler(event, context): message = event['Records'][0]['Sns']['Message'] message = json.loads(message) job_id = message['job_id'] index = message['index'] batch_size = message['batch_size'] sampling_pi(job_id, index, batch_size)
実行してみる
今回はマネジメントコンソールからLambdaを実行してみましょう。
S3を見ると完了したことが確認できました。
CSVの中身はこんな感じです。
is_inside |
---|
0 |
1 |
0 |
1 |
1 |
0 |
0 |
最後にAthenaを使って確認しましょう。
データベースを作り、新しいテーブルを作ります。今回はjob_id
でパーティションを切っています。
テーブルが作成できたので平均を求めて、円周率を計算しましょう。
ここでは円周率は4A/Nであるため、平均を求め4倍すればいいです。
精度はまだまだ低いですがそれらしき値が出てきました。
せっかくなので回数を増やしてみましょう。
1000実行、各10000回のサンプリングです。
つまりは1000x10000=10000000回分のサンプリングです。
大体1分くらいで処理は終わりました。(Lambdaのタイムアウト時間を伸ばしました)
前回よりも真の値に収束してきていることが確認できますね
感想
今回はモンテカル法をLambdaを使って行いました。 ただ、サンプリング部分と分析部分にはまだ議論の余地があると思います。 というのも、Lambda以外でもAWS Batchなどを用いても同様のことができ、様々な選択肢があるためです。 また、分析部分についてもSQLやDynamoDBを使ってもいいと思います。
そんな中で今回のアーキテクチャを使うメリットは個人的には導入コストの低さだと思います。 Batch処理のフレームワークを理解しなくてもある程度は使える点がいいかなと思います。 あとは起動の速さでしょうか。 Lambdaは高速に起動するので場合によっては早く結果を受け取ることができるでしょう。
ただ、やはりAWS Batchなどと比べると構築の手間やコンピューティングリソースの制限(実行時間)などの観点から課題が残ります。
また、S3、Athenaを使うことでサーバーレスで構築することができました。 Athenaは処理量に応じてスケーリングしてくれるので、そのあたりを考えなくてもいいのもメリットです。
タスクに応じて最適なアーキテクチャを採用する必要があるでしょう。 機会があれば別のアーキテクチャも試してみようと思います。
注意点
Lambdaの同時実行数には制限がある
Lambdaの同時実行数はデフォルトで1000です。
ただ、数十万までは引き上げることができるようです。
Lambdaの実行時間には制限がある
Lambdaのタイムアウトは最大でも15分です。
サービスの料金に気をつける
例えばS3はPUTリクエスト1000回あたり0.0047USDがかかります。(2021/6/22)
今回の例で言えばjob_num
を10000にすると0.047USDかかります。
タスクの性質的に各サービスのコール回数が多くなってる傾向があると思うので、その点には気をつけた方が良いでしょう。
SNSやLambdaも呼び出しごとに料金はかかります。
また、Athenaはスキャンするデータ量によって課金がされます。